{
 "metadata": {
  "name": "",
  "signature": "sha256:9bf5bb420871143a13d72c3c9e34cd1a82af88740c6ed476a176f2ae245d4ba6"
 },
 "nbformat": 3,
 "nbformat_minor": 0,
 "worksheets": [
  {
   "cells": [
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "<img src=http://continuum.io/media/img/continuum_analytics_logo.png align=\"right\" width=\"30%\">\n",
      "\n",
      "# Expression Chunking\n",
      "\n",
      "We evaluate computations out-of-core by breaking expressions on entire datasets into two pieces\n",
      "\n",
      "1.  What to do on each chunk of a dataset\n",
      "2.  What to do on the concatenation of computed chunks\n",
      "\n",
      "This works well with the computation includes some sort of reduction"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Simple example\n",
      "\n",
      "Given a long vector of integers we compute the sum of the vector by \n",
      "\n",
      "1.  Computing the sum of each 1000000 long chunk\n",
      "2.  Computing the sum of the aggregated results"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from blaze import *\n",
      "from blaze.expr.split import split\n",
      "\n",
      "x = Symbol('x', '1000000000 * int')\n",
      "chunk = Symbol('chunk', '1000000 * int')\n",
      "\n",
      "split(x, x.sum(), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 1,
       "text": [
        "((chunk, sum(chunk, keepdims=True)), (aggregate, sum(aggregate)))"
       ]
      }
     ],
     "prompt_number": 1
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We reason about the shapes of all of the pieces so that intermediates may be preallocated"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "(chunk, chunk_expr), (aggregate, aggregate_expr) = split(x, x.sum(), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 2
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "chunk_expr.dshape"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 3,
       "text": [
        "dshape(\"1 * int32\")"
       ]
      }
     ],
     "prompt_number": 3
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "aggregate.dshape"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 4,
       "text": [
        "dshape(\"1000 * int32\")"
       ]
      }
     ],
     "prompt_number": 4
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## More complex cases\n",
      "\n",
      "Computing a chunked sum is simple.  Lets consider more complex cases"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Count\n",
      "\n",
      "When computing the count we count each chunk and then sum the results"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "split(x, count(x), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 5,
       "text": [
        "((chunk, count(chunk, keepdims=True)), (aggregate, sum(aggregate)))"
       ]
      }
     ],
     "prompt_number": 5
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "split(x, x.std(), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 6,
       "text": [
        "((chunk,\n",
        "  summary(n=count(chunk), x=sum(chunk), x2=sum(chunk ** 2), keepdims=True)),\n",
        " (aggregate,\n",
        "  sqrt((sum(aggregate.x2) / (sum(aggregate.n) * 1.0)) - ((sum(aggregate.x) / (sum(aggregate.n) * 1.0)) ** 2))))"
       ]
      }
     ],
     "prompt_number": 6
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "\n",
      "### Split apply combine\n",
      "\n",
      "Split-apply-combine operations are also reductive"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t = Symbol('t', 'var * {name: string, amount: int}')"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 7
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "split(t, by(t.name, avg=t.amount.mean()))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 8,
       "text": [
        "((chunk,\n",
        "  by(chunk.name, avg_count=count(chunk.amount), avg_total=sum(chunk.amount))),\n",
        " (aggregate,\n",
        "  by(aggregate.name, avg=(1.0 * sum(aggregate.avg_total)) / sum(aggregate.avg_count))))"
       ]
      }
     ],
     "prompt_number": 8
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We are smart enough to cut the expression at the right place, deciding which operations should occur on each chunk, and which must occur after the aggregation"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t2 = t[t.amount > 1000]"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 9
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "split(t, by(t2.name, total=t2.amount.sum() / 10))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 10,
       "text": [
        "((chunk,\n",
        "  by(chunk[chunk.amount > 1000].name, total=sum(chunk[chunk.amount > 1000].amount))),\n",
        " (aggregate, by(aggregate.name, total=sum(aggregate.total) / 10)))"
       ]
      }
     ],
     "prompt_number": 10
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### N-Dimensions\n",
      "\n",
      "This works equally well in N-Dimensions "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "x = Symbol('x', '1000000 * 2000000 * float64')\n",
      "chunk = Symbol('chunk', '1000 * 1000 * float64')"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 11
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "split(x, sum(2*x, axis=0), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 12,
       "text": [
        "((chunk, sum(2 * chunk, keepdims=True, axis=(0,))),\n",
        " (aggregate, sum(aggregate, axis=(0,))))"
       ]
      }
     ],
     "prompt_number": 12
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The datashapes of the various stages of computation"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "(chunk, chunk_expr), (agg, agg_expr) = split(x, sum(2*x, axis=0), chunk=chunk)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 13
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "chunk.dshape, chunk_expr.dshape, agg.dshape, agg_expr.dshape"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 14,
       "text": [
        "(dshape(\"1000 * 1000 * float64\"),\n",
        " dshape(\"1 * 1000 * float64\"),\n",
        " dshape(\"1000 * 2000000 * float64\"),\n",
        " dshape(\"2000000 * float64\"))"
       ]
      }
     ],
     "prompt_number": 14
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Practical Example: Combine NumPy and HDF5\n",
      "\n",
      "NumPy is a computationally rich in-memory container.  H5Py is a computationally poor on-disk container.  We use chunking to apply a numpy computational layer onto HDF5 files using chunking."
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Create a simple data set"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "!rm foo.hdf5"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "rm: cannot remove \u2018foo.hdf5\u2019: No such file or directory\r\n"
       ]
      }
     ],
     "prompt_number": 15
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "import h5py\n",
      "\n",
      "f = h5py.File('foo.hdf5')"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 16
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "x = np.arange(20*24, dtype='f4').reshape((20, 24))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 17
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "d = f.create_dataset('/x', shape=x.shape, dtype=x.dtype,\n",
      "                           fillvalue=0.0, chunks=(4, 6))\n",
      "d[:] = x"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 18
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "d"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 19,
       "text": [
        "<HDF5 dataset \"x\": shape (20, 24), type \"<f4\">"
       ]
      }
     ],
     "prompt_number": 19
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Combining NumPy and HDF5"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Consider the following expression in NumPy"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "(x + 1).sum(axis=0)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 20,
       "text": [
        "array([ 4580.,  4600.,  4620.,  4640.,  4660.,  4680.,  4700.,  4720.,\n",
        "        4740.,  4760.,  4780.,  4800.,  4820.,  4840.,  4860.,  4880.,\n",
        "        4900.,  4920.,  4940.,  4960.,  4980.,  5000.,  5020.,  5040.], dtype=float32)"
       ]
      }
     ],
     "prompt_number": 20
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can't do this on `h5py` datasets"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "(d + 1).sum(axis=0)  # d is an h5py dataset"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "ename": "TypeError",
       "evalue": "unsupported operand type(s) for +: 'Dataset' and 'int'",
       "output_type": "pyerr",
       "traceback": [
        "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mTypeError\u001b[0m                                 Traceback (most recent call last)",
        "\u001b[1;32m<ipython-input-21-fcd5b640e7f4>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[1;33m(\u001b[0m\u001b[0md\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0maxis\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m)\u001b[0m  \u001b[1;31m# d is an h5py dataset\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
        "\u001b[1;31mTypeError\u001b[0m: unsupported operand type(s) for +: 'Dataset' and 'int'"
       ]
      }
     ],
     "prompt_number": 21
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "But if we wrap it in Blaze then we can do this computation via chunking"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "b = Data(d)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 22
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "compute((b + 1).sum(axis=0))  # b is an h5py dataset wrapped by Blaze"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "metadata": {},
       "output_type": "pyout",
       "prompt_number": 23,
       "text": [
        "array([ 4580.,  4600.,  4620.,  4640.,  4660.,  4680.,  4700.,  4720.,\n",
        "        4740.,  4760.,  4780.,  4800.,  4820.,  4840.,  4860.,  4880.,\n",
        "        4900.,  4920.,  4940.,  4960.,  4980.,  5000.,  5020.,  5040.], dtype=float32)"
       ]
      }
     ],
     "prompt_number": 23
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "### Cleanup"
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "f.close()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 24
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "!rm foo.hdf5"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 25
    }
   ],
   "metadata": {}
  }
 ]
}